﻿#region

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Client.ChannelInterface;
using Client.Interface;
using Client.Messages;
using Client.Queries;
using Client.Tools;

#endregion

namespace Client.Core
{
    /// <summary>
    ///     Composite client. Aggregates/distributes data from/to multiple nodes
    /// </summary>
    internal class Aggregator : ICacheClient
    {
        private readonly object _transactionSync = new object();


        private int _startIndex;
        public IList<CacheClient> CacheClients { get; } = new List<CacheClient>();


        public ClusterInformation GetClusterInformation()
        {
            var responses = new ServerDescriptionResponse[CacheClients.Count];

            try
            {
                Parallel.For(0, CacheClients.Count, i =>
                {
                    var server = CacheClients[i].GetServerDescription();

                    responses[i] = server;
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }


            // check that all schemas are identical

            var reference = responses[0];
            for (var i = 1; i < CacheClients.Count; i++)
                foreach (var typeDescription in reference.KnownTypesByFullName)
                    if (!responses[i].KnownTypesByFullName[typeDescription.Key].Equals(typeDescription.Value))
                        throw new CacheException(
                            $"servers have different schemas: {responses[0].ServerProcessInfo.Host} <> {responses[i].ServerProcessInfo.Host} ");

            return new ClusterInformation(responses);
        }


        //TODO add UnitTest (coverage)
        public ServerLog GetLog(int lastLines)
        {
            var responses = new LogResponse[CacheClients.Count];

            try
            {
                Parallel.For(0, CacheClients.Count, i =>
                {
                    var serverLog = CacheClients[i].GetServerLog(lastLines);

                    responses[i] = serverLog;
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }


            return new ServerLog(responses);
        }

        public void SetReadonlyMode(bool rw = false)
        {
            try
            {
                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].SetReadonlyMode(rw); });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }
        }


        //TODO add unit test (coverage)
        public void DropDatabase()
        {
            try
            {
                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].DropDatabase(); });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }
        }

        public int[] GenerateUniqueIds(string generatorName, int quantity = 1)
        {
            var ids = new List<int>();

            // quantity of ids to be generated by each node
            var quantityToGenerate = new int[CacheClients.Count];


            var quantityPerShard = quantity / CacheClients.Count + 1;

            var quantityLeft = quantity;

            var i = _startIndex;
            for (; i < CacheClients.Count + _startIndex; i++)
            {
                quantityToGenerate[i % CacheClients.Count] = quantityPerShard;
                quantityLeft -= quantityPerShard;
                if (quantityLeft < quantityPerShard) quantityPerShard = quantityLeft;

                // In order to uniformly distribute objects on the multiple nodes always increment the _startIndex
                // Otherwise if we ask n times for one id, the same node would answer and the sharding system will allocate this node for all objects
                if (quantityLeft == 0)
                {
                    _startIndex = i + 1;
                    break;
                }
            }


            try
            {
                Parallel.For(0, CacheClients.Count, idx =>
                {
                    if (quantityToGenerate[idx] > 0)
                    {
                        var list = CacheClients[idx].GenerateUniqueIds(generatorName, quantityToGenerate[idx]);
                        lock (ids)
                        {
                            ids.AddRange(list);
                        }
                    }
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }

            ids.Sort();

            return ids.Take(quantity).ToArray();
        }


        public void ImportDump(string path)
        {
            try
            {
                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].ImportDumpStage0(path); });

                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].ImportDumpStage1(path); });

                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].ImportDumpStage2(path); });
            }
            catch (AggregateException e)
            {
                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].ImportDumpStage3(path); });
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }
        }

        public void InitializeFromDump(string path)
        {
            path = DumpHelper.NormalizeDumpPath(path);

            var schemaPath = Path.Combine(path, "schema.json");

            var json = File.ReadAllText(schemaPath);

            var schema = SerializationHelper.DeserializeJson<Schema>(json);

            foreach (var typeDescription in schema.TypeDescriptions)
            {
                // register the type on the server

                var shard = 0;
                foreach (var client in CacheClients)
                {
                    var request = new RegisterTypeRequest(typeDescription, shard, CacheClients.Count);

                    var response = client.Channel.SendRequest(request);

                    if (response is ExceptionResponse exResponse)
                        throw new CacheException("Error while registering a type on the server", exResponse.Message,
                            exResponse.CallStack);

                    shard++;
                }

                FeedMany(DumpHelper.ObjectsInDump(path, typeDescription), true);
            }

            // reinitialize the sequences. As the shard count has probably changed reinitialize all the sequences in each shard
            // starting with the maximum value

            var maxValues = new Dictionary<string, int>();

            var files = Directory.EnumerateFiles(path, "sequence_*.json");

            foreach (var file in files)
            {
                var sequenceJson = File.ReadAllText(file);
                var seq = SerializationHelper.DeserializeJson<Dictionary<string, int>>(sequenceJson);
                foreach (var pair in seq)
                {
                    var keyFound = maxValues.ContainsKey(pair.Key);
                    if (keyFound && maxValues[pair.Key] < pair.Value || !keyFound) maxValues[pair.Key] = pair.Value;
                }
            }

            // resync sequences on the server

            ResyncUniqueIds(maxValues);
        }


        // Add unit test (coverage)
        public IList<CachedObject> GetObjectDescriptions(OrQuery query)
        {
            var result = new List<CachedObject>();

            try
            {
                Parallel.For(0, CacheClients.Count, i =>
                {
                    var fromOneServer = CacheClients[i].GetObjectDescriptions(query);
                    result.AddRange(fromOneServer);
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }

            return result;
        }

        public void Stop(bool restart)
        {
            try
            {
                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].Stop(restart); });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }
        }

        public void ExecuteTransaction(IList<CachedObject> itemsToPut, IList<OrQuery> conditions,
            IList<CachedObject> itemsToDelete = null)
        {
            if (itemsToPut.Count != conditions.Count)
                throw new ArgumentException($"{nameof(itemsToPut)} and {nameof(conditions)} do not have the same size");

            // the same connector will not execute transactions in parallel
            lock (_transactionSync)
            {
                // split the global transaction between servers
                var itemsByServer = new ConcurrentDictionary<int, TransactionRequest>();

                var transactionId = TransactionRequest.GenerateId();


                var index = 0;
                foreach (var item in itemsToPut)
                {
                    var serverIndex = WhichNode(item);

                    if (!itemsByServer.ContainsKey(serverIndex))
                        itemsByServer.TryAdd(serverIndex, new TransactionRequest());

                    itemsByServer[serverIndex].ItemsToPut.Add(item);
                    itemsByServer[serverIndex].Conditions.Add(conditions[index]);
                    itemsByServer[serverIndex].TransactionId = transactionId;


                    index++;
                }

                if (itemsToDelete != null)
                    foreach (var item in itemsToDelete)
                    {
                        var serverIndex = WhichNode(item);

                        if (!itemsByServer.ContainsKey(serverIndex))
                            itemsByServer.TryAdd(serverIndex, new TransactionRequest());

                        itemsByServer[serverIndex].ItemsToDelete.Add(item);


                        index++;
                    }


                // Fallback to single stage if only one node is concerned
                if (itemsByServer.Count == 1)
                {
                    var server = itemsByServer.Keys.Single();

                    CacheClients[server].ExecuteTransaction(itemsToPut, conditions, itemsToDelete);

                    TransactionStatistics.ExecutedAsSingleStage();

                    return;
                }


                var sessions = new ConcurrentDictionary<int, Session>();

                var serverStatus = new ConcurrentDictionary<int, bool>();

                // select only the clients that are concerned
                var clients = CacheClients.Where(c => itemsByServer.ContainsKey(c.ShardIndex)).ToList();

                // first stage : send the transaction request to the servers and wait for them to acquire write locks

                SendRequestsAndWaitForLock(clients, itemsByServer, sessions, transactionId, serverStatus);

                Dbg.Trace($"C: proceeding with first stage transaction {transactionId} ");

                var exType = ExceptionType.Unknown;


                // first stage: the durable transaction is written in the transaction log
                Parallel.ForEach(clients, client =>
                {
                    try
                    {
                        var session = sessions[client.ShardIndex];

                        var response = client.Channel.GetResponse(session);

                        if (response is ReadyResponse)
                        {
                            serverStatus[client.ShardIndex] = true;
                        }
                        else
                        {
                            serverStatus[client.ShardIndex] = false;

                            if (response is ExceptionResponse exceptionResponse)
                                exType = exceptionResponse.ExceptionType;
                        }
                    }
                    catch (Exception)
                    {
                        serverStatus[client.ShardIndex] = false;
                    }
                });


                // second stage: commit or rollback only the servers that processed successfully the first stage
                // (if a server answered with an exception response the transaction was already rolled back on this server)

                var firstStageOk = serverStatus.Values.All(s => s);

                if (firstStageOk)
                {
                    // commit the transaction
                    Dbg.Trace($"C: proceeding with second stage transaction {transactionId} ");

                    Parallel.ForEach(clients, client =>
                    {
                        var session = sessions[client.ShardIndex];
                        client.Channel.Continue(session, true);
                    });

                    TransactionStatistics.NewTransactionCompleted();
                }
                else
                {
                    Dbg.Trace($"C: rollback first stage transaction {transactionId} ");

                    Parallel.ForEach(clients, client =>
                    {
                        // need to rollback only the clients that have executed the first stage
                        if (serverStatus[client.ShardIndex])
                        {
                            var session = sessions[client.ShardIndex];
                            client.Channel.Continue(session, false);
                        }
                    });

                    throw new CacheException(
                        $"Error in two stage transaction. The transaction was successfully rolled back: {exType}", exType);
                }
                
                
                // close the session
                Parallel.ForEach(CacheClients, client =>
                {
                    if (itemsByServer.ContainsKey(client.ShardIndex))
                    {
                        var session = sessions[client.ShardIndex];
                        client.Channel.EndSession(session);
                    }
                });    
                
                
            }
        }

        public void Import(string jsonFile)
        {
            var objects = DumpHelper.LoadObjects(jsonFile, this);

            FeedMany(objects, true);
        }

        public bool TryAdd<T>(T item)
        {
            var description = RegisterTypeIfNeeded(typeof(T));
            var packedItem = CachedObject.Pack(item, description);

            var node = WhichNode(packedItem);

            return CacheClients[node].TryAdd(item);
        }

        public void UpdateIf<T>(T newValue, OrQuery testAsQuery)
        {
            var description = RegisterTypeIfNeeded(typeof(T));
            var packedItem = CachedObject.Pack(newValue, description);

            var node = WhichNode(packedItem);

            CacheClients[node].UpdateIf(newValue, testAsQuery);
        }

        public void DeclareDomain(DomainDescription domain)
        {
            try
            {
                Parallel.ForEach(CacheClients, client => client.DeclareDomain(domain));
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }
        }

        public void ConfigEviction(string fullTypeName, EvictionType evictionType, int limit, int itemsToRemove,
            int timeLimitInMilliseconds)
        {
            try
            {
                // the limit is global to the cluster
                var limitByNode = limit / CacheClients.Count;
                var removeByNode = itemsToRemove / CacheClients.Count + 1;

                Parallel.ForEach(CacheClients,
                    client => client.ConfigEviction(fullTypeName, evictionType, limitByNode, removeByNode, timeLimitInMilliseconds));
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }
        }

        public void Dispose()
        {
            foreach (var client in CacheClients) client.Dispose();
        }


        public void Dump(string path)
        {
            try
            {
                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].Dump(path); });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }
        }

        /// <summary>
        ///     Add or replace an existing item
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="item"></param>
        /// <param name="excludeFromEviction"></param>
        public void Put<T>(T item, bool excludeFromEviction = false)
        {
            if (item == null)
                throw new ArgumentNullException(nameof(item));

            var itemType = typeof(T);
            var description = RegisterTypeIfNeeded(itemType);

            var packedItem = CachedObject.Pack(item, description);

            var request = new PutRequest(typeof(T)) {ExcludeFromEviction = excludeFromEviction};

            request.Items.Add(packedItem);


            // select the node using a sharding algorithm
            var index = WhichNode(packedItem);

            var response = CacheClients[index].Channel.SendRequest(request);

            if (response is ExceptionResponse exResponse)
                throw new CacheException("Error while writing an object to the cache", exResponse.Message,
                    exResponse.CallStack);
        }


        /// <summary>
        ///     Transactionally update or insert multiple objects
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="items"></param>
        /// <param name="excludeFromEviction"></param>
        public void PutMany<T>(IEnumerable<T> items, bool excludeFromEviction = false)
        {
            if (items == null)
                throw new ArgumentNullException(nameof(items));

            var itemType = typeof(T);

            var description = RegisterTypeIfNeeded(itemType);

            // initialize one request per node
            var requests = new PutRequest[CacheClients.Count];
            for (var i = 0; i < CacheClients.Count; i++)
                requests[i] = new PutRequest(itemType) {ExcludeFromEviction = excludeFromEviction};

            foreach (var item in items)
            {
                var packedItem = CachedObject.Pack(item, description);
                var node = WhichNode(packedItem);
                requests[node].Items.Add(packedItem);
            }


            try
            {
                Parallel.For(0, CacheClients.Count, i =>
                {
                    var response =
                        CacheClients[i].Channel.SendRequest(requests[i]);

                    if (response is ExceptionResponse exResponse)
                        throw new CacheException(
                            "Error while writing an object to the cache",
                            exResponse.Message, exResponse.CallStack);
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }
        }


        /// <summary>
        ///     FeedMany is non transactional but it does not lock the cache. To be used for massive feeds
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="items"></param>
        /// <param name="excludeFromEviction"></param>
        public void FeedMany<T>(IEnumerable<T> items, bool excludeFromEviction = false)
        {
            FeedMany(items, excludeFromEviction, CacheClient.DefaultPacketSize);
        }


        public ClientSideTypeDescription RegisterType(Type type, ClientSideTypeDescription typeDescription, bool forceReindex = false)
        {
            ClientSideTypeDescription result = null;
            foreach (var client in CacheClients) result = client.RegisterType(type, typeDescription);

            return result;
        }

        public KeyValuePair<bool, int> EvalQuery(OrQuery query)
        {
            try
            {
                var total = 0;

                var complete = true;


                Parallel.For(0, CacheClients.Count, i =>
                {
                    var response =
                        CacheClients[i].EvalQuery(query);
                    // ReSharper disable once AccessToModifiedClosure
                    Interlocked.Add(ref total, response.Value);

                    complete |= response.Key;
                });

                if (query.Take > 0 && total > query.Take) total = query.Take;
                return new KeyValuePair<bool, int>(complete, total);
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;

                return new KeyValuePair<bool, int>(true, 0);
            }
        }

        /// <summary>
        ///     FeedMany is non transactional but it does not lock the cache. To be used for massive feeds
        ///     Can feed with business objects (they will be packed to CachedObject) or directly with CachedObject
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="items"></param>
        /// <param name="excludeFromEviction"></param>
        /// <param name="packetSize"></param>
        public void FeedMany<T>(IEnumerable<T> items, bool excludeFromEviction, int packetSize)
        {
            if (items == null)
                throw new ArgumentNullException(nameof(items));

            // Lazy type registration
            var itemType = typeof(T);

            ClientSideTypeDescription description = null;
            var needPack = itemType != typeof(CachedObject);
            string typeName = null;

            var sessionId = Guid.NewGuid().ToString();

            if (needPack)
            {
                description = RegisterTypeIfNeeded(itemType);
                typeName = description.FullTypeName;
            }

            // initialize one request per node
            var requests = new PutRequest[CacheClients.Count];


            // Parallelize both nodes and requests (multiple requests for the same node are executed in parallel if multiple connections are available in the pool)
            var tasks = new List<Task>();
            foreach (var item in items)
            {
                if (typeName == null) typeName = (item as CachedObject)?.FullTypeName;

                var packedItem = needPack ? CachedObject.Pack(item, description) : item as CachedObject;

                var node = WhichNode(packedItem);

                if (requests[node] == null)
                    requests[node] = new PutRequest(typeName)
                    {
                        ExcludeFromEviction = excludeFromEviction,
                        SessionId = sessionId
                    };

                var request = requests[node];


                request.Items.Add(packedItem);

                if (request.Items.Count == packetSize)
                {
                    var task = Task.Factory.StartNew(re =>
                    {
                        var put = (PutRequest) re;
                        var split = put.SplitWithMaxSize();

                        foreach (var putRequest in split)
                        {
                            var response =
                                CacheClients[node].Channel.SendRequest(putRequest);
                            if (response is ExceptionResponse exResponse)
                                throw new CacheException(
                                    "Error while writing an object to the cache",
                                    exResponse.Message, exResponse.CallStack);    
                        }
                        
                    }, request);


                    tasks.Add(task);
                    requests[node] = new PutRequest(typeName)
                    {
                        ExcludeFromEviction = excludeFromEviction,
                        SessionId = sessionId
                    };
                }
            }


            //send the last packet left for each node
            for (var node = 0; node < CacheClients.Count; node++)
            {
                var request = requests[node];


                if (request != null)
                {
                    request.EndOfSession = true;

                    var n = node;
                    var task = Task.Factory.StartNew(re =>
                    {
                        var response = CacheClients[n].Channel.SendRequest((PutRequest) re);
                        if (response is ExceptionResponse exResponse)
                            throw new CacheException(
                                "Error while writing an object to the cache",
                                exResponse.Message, exResponse.CallStack);
                    }, request);

                    tasks.Add(task);
                }
            }

            try
            {
                Task.WaitAll(tasks.ToArray());
                Dbg.Trace($"{tasks.Count} tasks finished");
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }
        }


        /// <summary>
        ///     Start a feed session. It is used to pre load the cache with large amounts of data
        /// </summary>
        /// <typeparam name="TItem"></typeparam>
        /// <param name="packetSize"></param>
        /// <param name="excludeFromEviction"></param>
        /// <returns></returns>
        public IFeedSession BeginFeed<TItem>(int packetSize, bool excludeFromEviction = true) where TItem : class
        {
            RegisterTypeIfNeeded(typeof(TItem));


            return new ParallelFeedSession<TItem>(CacheClients.Count, packetSize, excludeFromEviction);
        }


        /// <summary>
        ///     Add an element to the cache during a fill session. Items are effectively send to the server in fixed size packets
        /// </summary>
        /// <typeparam name="TItem"></typeparam>
        /// <param name="session"></param>
        /// <param name="item"></param>
        public void Add<TItem>(IFeedSession session, TItem item) where TItem : class
        {
            var sessionImplementation = (ParallelFeedSession<TItem>) session;

            if (sessionImplementation.IsClosed)
                throw new CacheException("The feed session is closed");

            var description = RegisterTypeIfNeeded(typeof(TItem));

            var packedItem = CachedObject.Pack(item, description);

            var node = WhichNode(packedItem);

            var request = sessionImplementation.Requests[node];
            request.Items.Add(packedItem);

            // for each node fill a packet of fixed size and send it to the server only when completed
            if (request.Items.Count == sessionImplementation.PacketSize)
            {
                // create a new empty one for the future objects
                sessionImplementation.Requests[node] = new PutRequest(typeof(TItem));

                var task = Task.Factory.StartNew(re =>
                {
                    var response = CacheClients[node].Channel.SendRequest((Request) re);
                    if (response is ExceptionResponse exResponse)
                        throw new CacheException(
                            "Error while writing an object to the cache",
                            exResponse.Message, exResponse.CallStack);
                }, request);

                sessionImplementation.AddTask(task);
            }
        }


        /// <summary>
        ///     Close a feed session. Send all the remaining data and wait for all asynchronous operations to finish
        /// </summary>
        /// <typeparam name="TItem"></typeparam>
        /// <param name="session"></param>
        public void EndFeed<TItem>(IFeedSession session) where TItem : class
        {
            var sessionImplementation = (ParallelFeedSession<TItem>) session;

            if (sessionImplementation.IsClosed)
                throw new CacheException("The feed session is closed");


            //send the last packet left for each node
            for (var node = 0; node < CacheClients.Count; node++)
            {
                var request = sessionImplementation.Requests[node];

                if (request != null && request.Items.Count > 0)
                {
                    var n = node; // copy to avoid modified closure
                    var task = Task.Factory.StartNew(re =>
                    {
                        var response = CacheClients[n].Channel.SendRequest((Request) re);
                        if (response is ExceptionResponse exResponse)
                            throw new CacheException(
                                "Error while writing an object to the cache",
                                exResponse.Message, exResponse.CallStack);
                    }, request);

                    sessionImplementation.AddTask(task);
                }
            }

            try
            {
                sessionImplementation.WaitForAll();
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }


            sessionImplementation.IsClosed = true;
        }

        public int RemoveMany(OrQuery query)
        {
            var sum = 0;
            try
            {
                Parallel.ForEach(CacheClients, client =>
                {
                    var removed = client.RemoveMany(query);
                    Interlocked.Add(ref sum, removed);
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }

            return sum;
        }

        /// <summary>
        ///     Remove all data from a table. Much faster than <see cref="RemoveMany" />
        /// </summary>
        /// <typeparam name="TItem"></typeparam>
        /// <returns></returns>
        public int Truncate<TItem>()
        {
            var sum = 0;
            try
            {
                Parallel.ForEach(CacheClients, client =>
                {
                    var removed = client.Truncate<TItem>();
                    Interlocked.Add(ref sum, removed);
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }

            return sum;
        }


        /// <summary>
        ///     Try to get an object by primary key
        /// </summary>
        /// <typeparam name="TItemType"></typeparam>
        /// <param name="value"></param>
        /// <returns></returns>
        public TItemType GetOne<TItemType>(object value)
        {
            var builder = new QueryBuilder(typeof(TItemType));

            var query = builder.GetOne(value);
            var primaryKey = query.Elements[0].Elements[0].Value;
            var node = WhichNode(primaryKey);

            var result = CacheClients[node].GetMany<TItemType>(query);

            return result.FirstOrDefault();
        }

        /// <summary>
        ///     Try to get one element by unique key
        /// </summary>
        /// <typeparam name="TItemType"></typeparam>
        /// <param name="keyName"></param>
        /// <param name="value"></param>
        /// <returns></returns>
        public TItemType GetOne<TItemType>(string keyName, object value)
        {
            var result = default(TItemType);

            Parallel.ForEach(CacheClients, client =>
            {
                var oneResult = GetOne<TItemType>(keyName, value);
                if (!oneResult.Equals(default(TItemType))) result = oneResult;
            });


            return result;
        }


        /// <summary>
        ///     In some cases we need the order of returned elements to be stable between calls.
        ///     For example if using the Take or Skip operators
        /// </summary>
        /// <typeparam name="TItemType"></typeparam>
        /// <param name="query"></param>
        /// <returns></returns>
        public IEnumerable<TItemType> GetMany<TItemType>(OrQuery query)

        {
            var clientResults = new IEnumerator<RankedItem>[CacheClients.Count];

            try
            {
                Parallel.ForEach(CacheClients, client =>
                {
                    var resultsFromThisNode = client.GetManyWithRank<TItemType>(query);
                    clientResults[client.ShardIndex] = resultsFromThisNode.GetEnumerator();
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }


            if (!query.IsFullTextQuery)
            {
                var count = 0;
                while (true)
                {
                    var allFinished = true;
                    foreach (var clientResult in clientResults)
                    {
                        // limit the number of returned object if Take linq extension method was used
                        if (query.Take > 0 && count >= query.Take) yield break;

                        if (clientResult.MoveNext())
                        {
                            allFinished = false;
                            yield return (TItemType) clientResult.Current?.Item;
                            count++;
                        }
                    }

                    if (allFinished) yield break;
                }
            }
            else // for full text queries we have to merge all results and sort by rank
            {
                var all = new List<RankedItem>();

                Parallel.ForEach(clientResults, r =>
                {
                    List<RankedItem> resultFromClient = new List<RankedItem>();
                    while (r.MoveNext())
                    {
                        resultFromClient.Add(r.Current);
                    }

                    lock (all)
                    {
                        all.AddRange(resultFromClient);
                    }

                });

                foreach (var item in all.OrderByDescending(ri => ri.Rank).Select(ri => ri.Item).Cast<TItemType>())
                {
                    yield return item;
                }

            }
            
        }


        /// <summary>
        ///     Try to remove an item by primary key
        /// </summary>
        /// <typeparam name="TItemType"></typeparam>
        /// <param name="primaryKeyValue"></param>
        public void Remove<TItemType>(object primaryKeyValue)
        {
            var builder = new QueryBuilder(typeof(TItemType));

            KeyValue primaryKey;

            if (primaryKeyValue is KeyValue kv)
                primaryKey = kv;
            else
                primaryKey = builder.MakePrimaryKeyValue(primaryKeyValue);


            var node = WhichNode(primaryKey);


            CacheClients[node].Remove<TItemType>(primaryKeyValue);
        }

        /// <summary>
        ///     Declare that data was fully loaded on all the nodes
        /// </summary>
        /// <typeparam name="TItemType"></typeparam>
        /// <param name="isFullyLoaded"></param>
        public void DeclareDataFullyLoaded<TItemType>(bool isFullyLoaded)
        {
            try
            {
                Parallel.ForEach(CacheClients, client => client.DeclareDataFullyLoaded<TItemType>(isFullyLoaded));
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }
        }


        /// <summary>
        ///     Check that data is fully loaded on all the nodes for a specific data type
        /// </summary>
        /// <typeparam name="TItemType"></typeparam>
        /// <returns></returns>
        public bool IsDataFullyLoaded<TItemType>()
        {
            var answers = new ConcurrentQueue<bool>();

            try
            {
                Parallel.ForEach(CacheClients, client =>
                {
                    var oneResult = client.IsDataFullyLoaded<TItemType>();
                    answers.Enqueue(oneResult);
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }


            return answers.All(a => a);
        }


        /// <summary>
        ///     Check that all the nodes are up and running
        /// </summary>
        /// <returns></returns>
        public bool Ping()
        {
            var answers = new ConcurrentQueue<bool>();

            try
            {
                Parallel.ForEach(CacheClients, client =>
                {
                    var oneResult = client.Ping();
                    answers.Enqueue(oneResult);
                });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null) throw e.InnerException;
            }

            return answers.All(a => a);
        }

        public ClientSideTypeDescription RegisterType(Type type, bool forceReindex = false)
        {
            ClientSideTypeDescription description = null;
            
            Parallel.ForEach(CacheClients, client =>
            {
                description = client.RegisterType(type, forceReindex);
            });

            return description;
        }

        public ClientSideTypeDescription RegisterTypeIfNeeded(Type type)
        {
            ClientSideTypeDescription description = null;
            foreach (var client in CacheClients) description = client.RegisterTypeIfNeeded(type);

            return description;
        }

        public ClientSideTypeDescription RegisterTypeIfNeeded(Type type, TypeDescriptionConfig description)
        {
            ClientSideTypeDescription result = null;
            foreach (var client in CacheClients) result = client.RegisterTypeIfNeeded(type, description);

            return result;
        }

        public void ResyncUniqueIds(IDictionary<string, int> newValues)
        {
            try
            {
                Parallel.For(0, CacheClients.Count, i => { CacheClients[i].ResyncUniqueIds(newValues); });
            }
            catch (AggregateException e)
            {
                if (e.InnerException != null)
                    if (e.InnerException is CacheException ex)
                        throw ex;
            }
        }


        private void SendRequestsAndWaitForLock<TRequest>(List<CacheClient> clients,
            ConcurrentDictionary<int, TRequest> requestByServer, ConcurrentDictionary<int, Session> sessions,
            string transactionId, ConcurrentDictionary<int, bool> serverStatus) where TRequest : Request
        {
            var locksOk = false;

            var iteration = 0;


            while (!locksOk)
            {
                try
                {
                    var delay = ThreadLocalRandom.Instance.Next(10 * iteration);

                    TransactionStatistics.Retries(iteration + 1);


                    Dbg.Trace(
                        $"C: delay = {delay} for iteration {iteration} transaction {transactionId} connector {GetHashCode()}");

                    if (delay > 0)
                        Thread.Sleep(delay);

                    // send transaction requests
                    Parallel.ForEach(clients, client =>
                    {
                        var request = requestByServer[client.ShardIndex];


                        try
                        {
                            var session = client.Channel.BeginSession();
                            sessions[client.ShardIndex] = session;

                            Dbg.Trace(
                                $"C: Sending transaction request to server {client.ShardIndex} transaction {transactionId} connector {GetHashCode()}");
                            client.Channel.PushRequest(session, request);
                        }
                        catch (Exception e)
                        {
                            // here if communication exception
                            serverStatus[client.ShardIndex] = false;

                            Dbg.Trace($"C: Exception while sending request to server {client.ShardIndex}:{e.Message}");
                        }
                    });

                    // wait for servers to acquire lock
                    Parallel.ForEach(clients, client =>
                    {
                        try
                        {
                            var session = sessions[client.ShardIndex];

                            var answer = client.Channel.GetResponse(session);
                            if (answer is ReadyResponse)
                                serverStatus[client.ShardIndex] = true;
                            else
                                serverStatus[client.ShardIndex] = false;
                        }
                        catch (Exception e)
                        {
                            // here if communication exception
                            serverStatus[client.ShardIndex] = false;

                            Dbg.Trace($"C: Exception while sending request to server {client.ShardIndex}:{e.Message}");
                        }
                    });
                }
                catch (AggregateException e)
                {
                    // this code should never be reached
                    throw new CacheException(
                        $"Error in the first stage of a two stage transaction:{e.InnerExceptions.First()}");
                }

                locksOk = serverStatus.Values.All(s => s);

                Dbg.Trace(!locksOk
                    ? $"C: Failed to acquire lock for transaction {transactionId}. retrying "
                    : $"C: Lock acquired for all servers: transaction {transactionId} ");

                if (locksOk == false)
                    Parallel.ForEach(clients, client =>
                    {
                        if (serverStatus[client.ShardIndex])
                        {
                            var session = sessions[client.ShardIndex];
                            client.Channel.PushRequest(session, new ContinueRequest {Rollback = true});
                        }
                    });
                else
                    Parallel.ForEach(clients, client =>
                    {
                        var session = sessions[client.ShardIndex];
                        client.Channel.PushRequest(session, new ContinueRequest {Rollback = false});
                    });

                iteration++;

                TransactionStatistics.NewAttemptToLock();
            }
        }

        private int WhichNode(CachedObject obj)
        {
            return WhichNode(obj.PrimaryKey);
        }

        private int WhichNode(KeyValue primaryKey)
        {
            if (primaryKey.KeyType != KeyType.Primary)
                throw new NotSupportedException("Sharding may be applied only to primary keys");

            return primaryKey.GetHashCode() % CacheClients.Count;
        }
    }
}